package com.bw.gmall.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.bw.gmall.realtime.bean.TrafficPageViewBean;
import com.bw.gmall.realtime.utils.DateFormatUtil;
import com.bw.gmall.realtime.utils.MyClickHouseUtil;
import com.bw.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
/*
*
* dwd 独立访客
* dwd 跳出
* dwd 分流
* */
public class DwsTrafficVcChArIsNewPageViewWindow {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2.读取三个主题的数据创建流
        String uvTopic = "dwd_traffic_unique_visitor_detail";//          独立访客
        String ujdTopic = "dwd_traffic_user_jump_detail"; //             跳出用户数
        String topic = "dwd_traffic_page_log";  //执行时间   流量量       会话数、页面浏览数和浏览总时长
        String groupId = "vccharisnew_pageview_window_2103a";

       //json  {}
        //创建对象    vc   ch  ar  is_new  uv   jv  pv   sv duringtime
        //  vc   ch  ar  is_new  uv =1  jv=0  pv=0   sv=0  duringtime=0
        //  vc   ch  ar  is_new  uv =0  jv=1  pv=0   sv=0  duringtime=0  时效性收运行
        //  vc   ch  ar  is_new  uv =0  jv=0  pv=1   sv=1 只要上一是null  duringtime=during_time


        //uv
        DataStreamSource<String> uvDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(uvTopic, groupId));

        DataStreamSource<String> ujDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(ujdTopic, groupId));

        DataStreamSource<String> pageDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));


        //TODO 3.统一数据格式
        //独立
        SingleOutputStreamOperator<TrafficPageViewBean> trafficPageViewWithUvDS = uvDS.map(line -> {
            JSONObject jsonObject = JSON.parseObject(line);
            JSONObject common = jsonObject.getJSONObject("common");

            return new TrafficPageViewBean(null, null,
                    common.getString("vc"),
                    common.getString("ch"),
                    common.getString("ar"),
                    common.getString("is_new"),
                    1L, 0L, 0L, 0L, 0L,
                    jsonObject.getLong("ts"));
        });

        //跳出
        SingleOutputStreamOperator<TrafficPageViewBean> trafficPageViewWithUjDS = ujDS.map(line -> {
            JSONObject jsonObject = JSON.parseObject(line);
            JSONObject common = jsonObject.getJSONObject("common");

            return new TrafficPageViewBean(null, null,
                    common.getString("vc"),
                    common.getString("ch"),
                    common.getString("ar"),
                    common.getString("is_new"),
                    0L, 0L, 0L, 0L, 1L,
                    jsonObject.getLong("ts"));
        });

//        会话数 last_page_id=null 1、页面浏览数 1  和  浏览总时长 during_time
//
        SingleOutputStreamOperator<TrafficPageViewBean> trafficPageViewWithPageDS = pageDS.map(line -> {
            JSONObject jsonObject = JSON.parseObject(line);
            JSONObject common = jsonObject.getJSONObject("common");
            JSONObject page = jsonObject.getJSONObject("page");

            String lastPageId = page.getString("last_page_id");
            long sv = 0L;//回话数
            if (lastPageId == null) {
                sv = 1L;
            }

            return new TrafficPageViewBean(null, null,
                    common.getString("vc"), //版本
                    common.getString("ch"), //驱动
                    common.getString("ar"), //区域
                    common.getString("is_new"), //新老用户
                    0L, //独立
                    sv,//回话
                     1L,
                    page.getLong("during_time"),
                    0L,
                    jsonObject.getLong("ts"));
        });
        trafficPageViewWithUjDS.print(">>>>>>>ujCTujCTujCT>>>>>>>>>>>>>");
        trafficPageViewWithUvDS.print("独立会话数");
        trafficPageViewWithUvDS.print("---page--->");


        //TODO 4.将三个流进行Union
        DataStream<TrafficPageViewBean> unionDS = trafficPageViewWithUvDS.union(
                trafficPageViewWithUjDS,
                trafficPageViewWithPageDS);

//        //TODO 5.提取事件时间生成WaterMark
        SingleOutputStreamOperator<TrafficPageViewBean> trafficPageViewWithWmDS =
                unionDS.assignTimestampsAndWatermarks(WatermarkStrategy.
                        <TrafficPageViewBean>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(new SerializableTimestampAssigner<TrafficPageViewBean>() {
            @Override
            public long extractTimestamp(TrafficPageViewBean element, long recordTimestamp) {
                return element.getTs();
            }
        }));
        trafficPageViewWithWmDS.print("查看水位线------>>>>");
//        //TODO 6.分组开窗聚合
        WindowedStream<TrafficPageViewBean, Tuple4<String, String, String, String>, TimeWindow> windowedStream =
                trafficPageViewWithWmDS.
                        keyBy(new KeySelector<TrafficPageViewBean, Tuple4<String, String, String, String>>() {
            @Override
            public Tuple4<String, String, String, String> getKey(TrafficPageViewBean value) throws Exception {
                return new Tuple4<>(value.getAr(),
                        value.getCh(),
                        value.getIsNew(),
                        value.getVc());
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(10l));

        SingleOutputStreamOperator<TrafficPageViewBean> resultDS = windowedStream.reduce(new ReduceFunction<TrafficPageViewBean>() {
            @Override
            public TrafficPageViewBean reduce(TrafficPageViewBean value1, TrafficPageViewBean value2) throws Exception {
                value1.setSvCt(value1.getSvCt() + value2.getSvCt());
                value1.setUvCt(value1.getUvCt() + value2.getUvCt());
                value1.setUjCt(value1.getUjCt() + value2.getUjCt());
                value1.setPvCt(value1.getPvCt() + value2.getPvCt());
                value1.setDurSum(value1.getDurSum() + value2.getDurSum());
                return value1;
            }
        }, new WindowFunction<TrafficPageViewBean, TrafficPageViewBean, Tuple4<String, String, String, String>, TimeWindow>() {
            @Override
            public void apply(Tuple4<String, String, String, String> key, TimeWindow window, Iterable<TrafficPageViewBean> input, Collector<TrafficPageViewBean> out) throws Exception {

                //获取数据
                TrafficPageViewBean next = input.iterator().next();

                //补充信息

                next.setStt( new Timestamp(window.getStart()));
                next.setEdt(new Timestamp(window.getEnd()));

                //修改TS
                next.setTs(System.currentTimeMillis());

                //输出数据
                out.collect(next);
            }
        });
//
//        //TODO 7.将数据写出到ClickHouse
        resultDS.print(">>>>>>>>");
        resultDS.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_traffic_vc_ch_ar_is_new_page_view_window values(?,?,?,?,?,?,?,?,?,?,?,?)"));

        //TODO 8.启动任务
        env.execute("DwsTrafficVcChArIsNewPageViewWindow");

    }

}
